package org.eocencle.magnet.spark1.component;

import kafka.serializer.StringDecoder;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.eocencle.magnet.core.component.*;
import org.eocencle.magnet.core.context.ComponentFactory;
import org.eocencle.magnet.core.context.Context;
import org.eocencle.magnet.core.mapping.InfoParam;
import org.eocencle.magnet.core.util.CoreTag;
import org.eocencle.magnet.core.util.StrictMap;
import org.eocencle.magnet.spark1.util.SparkUtil;
import scala.Tuple2;

import java.io.Serializable;
import java.util.*;

/**
 * spark流作业节点类
 * @author: huan
 * @Date: 2020-02-02
 * @Description:
 */
public class SparkStreamWorkStage extends StreamWorkStage implements Serializable {

    @Override
    public void initHandler(WorkStageHandler handler) {

    }

    @Override
    public List<WorkStageResult> execute(WorkStageParameter parameter) {
        Context context = parameter.getContext();
        JavaStreamingContext streamingContext = (JavaStreamingContext) context.getStreamContext();

        // 主题
        Set<String> topics = new HashSet<>(Arrays.asList(this.streamInfo.getTopics().split(CoreTag.STRING_COMMA)));

        // kafka参数
        StrictMap<String> kafkaConfig = new StrictMap<>("kafka config");
        InfoParam param = null;
        for (Map.Entry<String, InfoParam> entry: this.streamInfo.getKafkaConfig().entrySet()) {
            param = entry.getValue();
            kafkaConfig.put(param.getKey(), param.getValue());
        }

        // 创建流
        JavaPairInputDStream<String, String> dStream = KafkaUtils.createDirectStream(streamingContext, String.class,
                String.class, StringDecoder.class, StringDecoder.class, kafkaConfig, topics);

        // 遍历流
        dStream.map((Tuple2<String, String> tuple2) -> tuple2._2).foreachRDD((JavaRDD<String> line) -> {
            ComponentFactory factory = WorkStageComponentBuilderAssistant.getFactory();

            // 创建RDD
            JavaRDD<Row> rdd = SparkUtil.createRDD(line, streamInfo.getSeparator(), streamInfo.getFields());
            // 创建DataFrame
            DataFrame df = SparkUtil.createDataFrame((SQLContext) context.getSQLContext(), streamInfo.getFields(), rdd);

            SparkWorkStageResult result = (SparkWorkStageResult) factory.createWorkStageResult();
            result.setId(streamInfo.getId());
            result.setAlias(streamInfo.getAlias());
            result.setRdd(rdd);
            result.setDf(df);

            WorkStageComposite parent = getParent();
            String id = streamInfo.getId();
            String idName = parent.getMixedTableName(id);
            parent.putTableName(id, idName);

            df.registerTempTable(idName);

            parent.changeLastResult(result);

            for (WorkStageComponent component: components) {
                component.execute(parameter);
            }
        });

        return null;
    }

    @Override
    public void add(WorkStageComponent component) {
        this.components.add(component);
        component.setParent(this.getParent());
    }
}
